package com.lsx143.wordcount.exec;

import com.lsx143.wordcount.bean.WaterSensor;
import com.lsx143.wordcount.day2.RandomWaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;

public class Test_Timer_watermark {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 20001);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);
        DataStreamSource<WaterSensor> dss = env.addSource(new RandomWaterSensor(RandomWaterSensor.RUN_MODE.INCREASE));

        WatermarkStrategy<WaterSensor> wms = WatermarkStrategy
                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(1


                ))
                .withTimestampAssigner((element, recordTimestamp) -> element.getTs());
        dss
                .assignTimestampsAndWatermarks(wms)
                .keyBy(WaterSensor::getId)
                .process(new KeyedProcessFunction<String, WaterSensor, String>() {
                    @Override
                    public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                        long triggerTime = value.getTs() + 5000L;
                        System.out.println("当前ts=" + value.getTs() + " 设置触发器时间=" + triggerTime);
                        ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 5000);
//                        out.collect(value.toString());
                    }

                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        System.err.println(timestamp + "--" + "timestamp = " + ctx.timestamp() + " 定时器被触发.....");
                    }
                })
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}